package com.study.banyiyi.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @ClassName KafkaConsumer
 * @Description 消费者
 * @Author yangwm
 * @Date 2021/7/26 11:16
 * @Version 1.0
 */
@Slf4j
@Component
public class KafkaConsumer {

    @Value("${kafka.topic.user}")
    private String topicName;

    public void consume(){
        Properties props = new Properties();
        //设置配置属性
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id", "group-user");
        //自动提交offset，每1s提交一次（提交后的消息不再消费，避免重复消费问题）
        props.put("enable.auto.commit","true");
        //自动提交时间间隔
        props.put("auto.commit.interval.ms","1000");
        //earliest：当各分区下有已提交的offset时，从提交的offset开始消费；无提交的offset时，从头开始消费
        props.put("auto.offset.reset", "earliest");
        //根据上面的配置，新增消费者对象
        org.apache.kafka.clients.consumer.KafkaConsumer<String,String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
        //订阅topic
        consumer.subscribe(Collections.singletonList(topicName));

        while (true){
            //从服务器拉取数据
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
            records.forEach(record->{
                System.out.printf("成功消费消息：topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
            });
        }
    }
}
